Skip to main content

练习

练习

修改下面的代码。尝试传入不同的参数,将图片进行不同的处理。

调用同网段的摄像头

sender.py
import os
from importlib import import_module
from flask import Flask, render_template, Response,render_template_string
from io import BytesIO
import cv2
from PIL import ImageGrab, Image
import time
import threading
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident


class CameraEvent(object):
def __init__(self):
self.events = {}

def wait(self):
ident = get_ident()
if ident not in self.events:
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait()

def set(self):
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].isSet():
event[0].set()
event[1] = now
else:
if now - event[1] > 5:
remove = ident
if remove:
del self.events[remove]

def clear(self):
self.events[get_ident()][0].clear()


class BaseCamera(object):
thread = None
frame = None
last_access = 0
event = CameraEvent()

def __init__(self):
if BaseCamera.thread is None:
BaseCamera.last_access = time.time()

BaseCamera.thread = threading.Thread(target=self._thread)
BaseCamera.thread.start()

while self.get_frame() is None:
time.sleep(0)

def get_frame(self):
BaseCamera.last_access = time.time()

BaseCamera.event.wait()
BaseCamera.event.clear()

return BaseCamera.frame

@staticmethod
def frames():
raise RuntimeError('Must be implemented by subclasses.')

@classmethod
def _thread(cls):
print('Starting camera thread.')
frames_iterator = cls.frames()
for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set()
time.sleep(0)
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
print('Stopping camera thread due to inactivity.')
break
BaseCamera.thread = None



class Camera(BaseCamera):
video_source = 0

@staticmethod
def set_video_source(source):
Camera.video_source = source

@staticmethod
def frames():
camera = cv2.VideoCapture(Camera.video_source)
if not camera.isOpened():
raise RuntimeError('Error')

while True:
image = ImageGrab.grab() # 获取屏幕数据
# w, h = image.size
image = image.resize((1366, 750)) # 图片缩放
output_buffer = BytesIO() # 创建二进制对象
image.save(output_buffer, format='JPEG', quality=100) # quality提升图片分辨率
frame = output_buffer.getvalue() # 获取二进制数据
yield frame # 生成器返回一张图片的二进制数据

app = Flask(__name__)


@app.route('/')
def index():
"""
视图函数
:return:
"""
return render_template_string('''<html>

<head>
<title>屏幕共享</title>
</head>

<body>
<img src="{{ url_for('video_feed') }}">
</body>

</html>''')


def gen(camera):
"""
流媒体发生器
"""
while True:
frame = camera.get_frame()

yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')


@app.route('/video_feed')
def video_feed():
"""流媒体数据"""
return Response(gen(Camera()),
mimetype='multipart/x-mixed-replace; boundary=frame')


if __name__ == '__main__':
ip_host = '127.0.0.1'# 本机ip地址
ip_host2 = '0.0.0.0'# 内网ip地址
app.run(threaded=True, host=ip_host2, port=80)
processing.py
import cv2

url_path = "http://192.168.7.70/video_feed"

cap = cv2.VideoCapture(url_path)

while True:
ret, frame = cap.read()
if not ret:
break
cv2.imshow("frame", frame)
cv2.waitKey(1)

cap.release()
cv2.destroyAllWindows()